import json
import pandas as pd
from datetime import datetime
from kafka import KafkaProducer

from air_web.config.config import config
from air_web.dw.common_fun import create_topic

send_dict = {"executeCityRange": ["51410","51428"],
             "downIndex": ["10","10"],
             "executeStartTime": ["2024-05-22","2024-05-22"],
             "executeEndTime": ["2024-05-22","2024-05-22"]}

send_df = pd.DataFrame(send_dict)
for col in [
    "id",
    "name",
    "code",
    "tradeCode",
    "tradeName",
    "strategyName",
]:
    send_df[col] = None

send_list = send_df.to_dict("record")

producer = KafkaProducer(
    bootstrap_servers=",".join(config["KAFKA_HOST"])
)
topic_name = "air_condition_plan"
create_topic(topic_name)
for send_dict in send_list:
    message = {
        "data": send_dict,
        "time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
    }
    value = json.dumps(message).encode("utf-8")
    producer.send(topic_name, value=value)
# 刷新并关闭Kafka生产者
producer.flush()
producer.close()
print("地市启动信息推送kafka成功!num:{}".format(len(send_list)))
